package com.luguangxing.utils;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author: 小白一号
 * @date: 2022/12/12
 * @desc:
 */
@Slf4j
@RestController
public class MqttUtil {
    private static String broker = "tcp://172.17.0.1:1883";
    private static String clientId = "emqx_test";
    private static String userName = "whiteone";
    private static String password = "white1@lu";
    public static MqttClient client;

    public static void mqttInit() {
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            client = new MqttClient(broker, clientId, persistence);
            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName(userName);
            connOpts.setPassword(password.toCharArray());
            // 保留会话
            connOpts.setCleanSession(true);

            // 设置回调
            client.setCallback(new OnMessageCallback());

            // 建立连接
            log.info("Connecting to broker: " + broker);

            client.connect(connOpts);

            log.info("Connected");

            //log.info("Message published");
            //client.disconnect();
            //log.info("Disconnected");
            //client.close();
            //System.exit(0);
        } catch (MqttException me) {
            log.info("reason " + me.getReasonCode());
            log.info("msg " + me.getMessage());
            log.info("loc " + me.getLocalizedMessage());
            log.info("cause " + me.getCause());
            log.info("excep " + me);
            me.printStackTrace();
        }
    }

    public static void publish(String topic, String message) {
        if (client == null) {
            mqttInit();
        }
        // 消息发布所需参数
        MqttMessage _message = new MqttMessage(message.getBytes());
        _message.setQos(0);
        try {
            client.publish(topic, _message);
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }

    public static void subscribe(String subTopic) {
        if (client == null) {
            mqttInit();
        }
        try {
            // 订阅
            client.subscribe(subTopic);
        } catch (MqttException e) {
            throw new RuntimeException(e);
        }
    }
}
